Skip to content

一、概述

Seata是 2019 年 1 月份蚂蚁金服和阿里巴巴共同开源的分布式事务解决方案。致力于提供高性能和简单易用的分布式事务服务,为用户打造一站式的分布式解决方案。

官网地址:http://seata.io/,其中的文档、播客中提供了大量的使用说明、源码分析。

Seata事务管理中有三个重要的角色:

  • TC (Transaction Coordinator) - **事务协调者:**维护全局和分支事务的状态,协调全局事务提交或回滚。

  • TM (Transaction Manager) - **事务管理器:**定义全局事务的范围、开始全局事务、提交或回滚全局事务。

  • RM (Resource Manager) - **资源管理器:**管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

1.1 整体的架构

如图:

image-20210724172326452

Seata基于上述架构提供了四种不同的分布式事务解决方案:

  • XA模式:强一致性分阶段事务模式,牺牲了一定的可用性,无业务侵入
  • TCC模式:最终一致的分阶段事务模式,有业务侵入
  • AT模式:最终一致的分阶段事务模式,无业务侵入,也是Seata的默认模式
  • SAGA模式:长事务模式,有业务侵入

无论哪种方案,都离不开TC,也就是事务的协调者。

1.2 执行流程

  1. A服务的TM向TC申请开启一个全局事务,TC就会创建一个全局事务并返回一个唯一的XID
  2. A服务的RM向TC注册分支事务,并及其纳入XID对应全局事务的管辖
  3. A服务执行分支事务,向数据库做操作
  4. A服务开始远程调用B服务,此时XID会在微服务的调用链上传播
  5. B服务的RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖
  6. B服务执行分支事务,向数据库做操作
  7. 全局事务调用链处理完毕,TM根据有无异常向TC发起全局事务的提交或者回滚
  8. TC协调其管辖之下的所有分支事务, 决定是否回滚

1.3 运行流程分析

1、每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连接代理的目的就是在第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务操作就一定有undo_log。

2、在第一阶段undo_log中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成就已经将分支事务提交,也就释放了锁资源。

3、TM开启全局事务开始,将XID全局事务id放在事务上下文中,通过feign调用也将XID传入下游分支事务,每个分支事务将自己的Branch ID分支事务ID与XID关联。

4、第二阶段全局事务提交,TC会通知各各分支参与者提交分支事务,在第一阶段就已经提交了分支事务,这里各各参与者只需要删除undo_log即可,并且可以异步执行,第二阶段很快可以完成。

5、第二阶段全局事务回滚,TC会通知各各分支参与者回滚分支事务,通过 XID 和 Branch ID 找到相应的回滚日志,通过回滚日志生成反向的 SQL 并执行,以完成分支事务回滚到之前的状态,如果回滚失败则会重试回滚操作

二、应用示例

2.1 与springcloud集成

依赖

xml
<!--seata-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <exclusions>
        <!--版本较低,1.3.0,因此排除--> 
        <exclusion>
            <artifactId>seata-spring-boot-starter</artifactId>
            <groupId>io.seata</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <!--seata starter 采用1.4.2版本-->
    <version>${seata.version}</version>
</dependency>

配置

yml
seata:
  registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址
    type: nacos # 注册中心类型 nacos
    nacos:
      server-addr: 127.0.0.1:8848 # nacos地址
      namespace: "" # namespace,默认为空
      group: DEFAULT_GROUP # 分组,默认是DEFAULT_GROUP
      application: seata-tc-server # seata服务名称
      username: nacos
      password: nacos
  tx-service-group: seata-demo # 事务组名称
  service:
    vgroup-mapping: # 事务组与cluster的映射关系
      seata-demo: SH

微服务是根据namespace@group@application#service去确定服务集群,在nacos中拉取对应的示例信息

三、核心知识

3.1 XA模型

Seata对原始的XA模式做了简单的封装和改造,以适应自己的事务模型,基本架构如图:

image-20210724174424070

RM一阶段的工作:

​ ① 注册分支事务到TC

​ ② 执行分支业务sql但不提交

​ ③ 报告执行状态到TC

TC二阶段的工作:

  • TC检测各分支事务执行状态

    a.如果都成功,通知所有RM提交事务

    b.如果有失败,通知所有RM回滚事务

RM二阶段的工作:

  • 接收TC指令,提交或回滚事务

优缺点

优点:

  • 事务的强一致性,满足ACID原则。
  • 常用数据库都支持,实现简单,并且没有代码侵入

缺点:

  • 因为一阶段需要锁定数据库资源,等待二阶段结束才释放,性能较差
  • 依赖关系型数据库实现事务

实现XA模式

Seata的starter已经完成了XA模式的自动装配,实现非常简单,步骤如下:

1)修改application.yml文件(每个参与事务的微服务),开启XA模式:

yaml
seata:
  data-source-proxy-mode: XA

2)给发起全局事务的入口方法添加@GlobalTransactional注解:

3.2 AT模式

实现AT模式

AT模式中的快照生成、回滚等动作都是由框架自动完成,没有任何代码侵入,因此实现非常简单。

只不过,AT模式需要一个表来记录全局锁、另一张表来记录数据快照undo_log。

1)导入数据库表,记录全局锁

导入提供的Sql文件:seata-at.sql,其中lock_table导入到TC服务关联的数据库,undo_log表导入到微服务关联的数据库:

2)修改application.yml文件,将事务模式修改为AT模式即可:

yaml
seata:
  data-source-proxy-mode: AT # 默认就是AT

3.3 TCC模式

实现TCC模式

解决空回滚和业务悬挂问题,必须要记录当前事务状态,

1)思路分析

这里我们定义一张表:

sql
CREATE TABLE `account_freeze_tbl` (
  `xid` varchar(128NOT NULL,
  `user_id` varchar(255DEFAULT NULL COMMENT '用户id',
  `freeze_money` int(11) unsigned DEFAULT '0' COMMENT '冻结金额',
  `state` int(1DEFAULT NULL COMMENT '事务状态,0:try,1:confirm,2:cancel',
  PRIMARY KEY (`xid`USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;

其中:

  • xid:是全局事务id
  • freeze_money:用来记录用户冻结金额
  • state:用来记录事务状态

那此时,我们的业务开怎么做呢?

  • Try业务:
    • 记录冻结金额和事务状态到account_freeze表
    • 扣减account表可用金额
  • Confirm业务
    • 根据xid删除account_freeze表的冻结记录
  • Cancel业务
    • 修改account_freeze表,冻结金额为0,state为2
    • 修改account表,恢复可用金额
  • 如何判断是否空回滚
    • cancel业务中,根据xid查询account_freeze,如果为null则说明try还没做,需要空回滚
  • 如何避免业务悬挂
    • try业务中,根据xid查询account_freeze ,如果已经存在则证明Cancel已经执行,拒绝执行try业务

2)声明TCC接口

TCC的Try、Confirm、Cancel方法都需要在接口中基于注解来声明,

我们在account-service项目中的cn.itcast.account.service包中新建一个接口,声明TCC三个接口:

java
package cn.itcast.account.service;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

@LocalTCC
public interface AccountTCCService {

    @TwoPhaseBusinessAction(name = "deduct", commitMethod = "confirm", rollbackMethod = "cancel")
    void deduct(@BusinessActionContextParameter(paramName = "userId") String userId,
                @BusinessActionContextParameter(paramName = "money")int money);

    boolean confirm(BusinessActionContext ctx);

    boolean cancel(BusinessActionContext ctx);
}

3)编写实现类

在account-service服务中的cn.itcast.account.service.impl包下新建一个类,实现TCC业务:

java
package cn.itcast.account.service.impl;

import cn.itcast.account.entity.AccountFreeze;
import cn.itcast.account.mapper.AccountFreezeMapper;
import cn.itcast.account.mapper.AccountMapper;
import cn.itcast.account.service.AccountTCCService;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Slf4j
public class AccountTCCServiceImpl implements AccountTCCService {

    @Autowired
    private AccountMapper accountMapper;
    @Autowired
    private AccountFreezeMapper freezeMapper;

    @Override
    @Transactional
    public void deduct(String userId, int money) {
        // 0.获取事务id
        String xid = RootContext.getXID();
        // 1.扣减可用余额
        accountMapper.deduct(userId, money);
        // 2.记录冻结金额,事务状态
        AccountFreeze freeze = new AccountFreeze();
        freeze.setUserId(userId);
        freeze.setFreezeMoney(money);
        freeze.setState(AccountFreeze.State.TRY);
        freeze.setXid(xid);
        freezeMapper.insert(freeze);
    }

    @Override
    public boolean confirm(BusinessActionContext ctx) {
        // 1.获取事务id
        String xid = ctx.getXid();
        // 2.根据id删除冻结记录
        int count = freezeMapper.deleteById(xid);
        return count == 1;
    }

    @Override
    public boolean cancel(BusinessActionContext ctx) {
        // 0.查询冻结记录
        String xid = ctx.getXid();
        AccountFreeze freeze = freezeMapper.selectById(xid);

        // 1.恢复可用余额
        accountMapper.refund(freeze.getUserId(), freeze.getFreezeMoney());
        // 2.将冻结金额清零,状态改为CANCEL
        freeze.setFreezeMoney(0);
        freeze.setState(AccountFreeze.State.CANCEL);
        int count = freezeMapper.updateById(freeze);
        return count == 1;
    }
}

四、安装

4.1 常规安装

  • 下载地址;http😕/seata.io/zh-cn/blog/download.html

  • 修改registry.conf (从nacos中读取配置)

    properties
    registry {
      # tc服务的注册中心类,这里选择nacos,也可以是eureka、zookeeper等
      type = "nacos"
    
      nacos {
        # seata tc 服务注册到 nacos的服务名称,可以自定义
        application = "seata-tc-server"
        serverAddr = "127.0.0.1:8848"
        group = "DEFAULT_GROUP"
        namespace = ""
        cluster = "SH"
        username = "nacos"
        password = "nacos"
      }
    }
    
    config {
      # 读取tc服务端的配置文件的方式,这里是从nacos配置中心读取,这样如果tc是集群,可以共享配置
      type = "nacos"
      # 配置nacos地址等信息
      nacos {
        serverAddr = "127.0.0.1:8848"
        namespace = ""
        group = "SEATA_GROUP"
        username = "nacos"
        password = "nacos"
        dataId = "seataServer.properties"
      }
    }
  • nacos中配置

    properties
    # 数据存储方式,db代表数据库
    store.mode=db
    store.db.datasource=druid
    store.db.dbType=mysql
    store.db.driverClassName=com.mysql.jdbc.Driver
    store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
    store.db.user=root
    store.db.password=123
    store.db.minConn=5
    store.db.maxConn=30
    store.db.globalTable=global_table
    store.db.branchTable=branch_table
    store.db.queryLimit=100
    store.db.lockTable=lock_table
    store.db.maxWait=5000
    # 事务、日志等配置
    server.recovery.committingRetryPeriod=1000
    server.recovery.asynCommittingRetryPeriod=1000
    server.recovery.rollbackingRetryPeriod=1000
    server.recovery.timeoutRetryPeriod=1000
    server.maxCommitRetryTimeout=-1
    server.maxRollbackRetryTimeout=-1
    server.rollbackRetryTimeoutUnlockEnable=false
    server.undo.logSaveDays=7
    server.undo.logDeletePeriod=86400000
    
    # 客户端与服务端传输方式
    transport.serialization=seata
    transport.compressor=none
    # 关闭metrics功能,提高性能
    metrics.enabled=false
    metrics.registryType=compact
    metrics.exporterList=prometheus
    metrics.exporterPrometheusPort=9898
  • 执行seata-tc-server.sql脚本

  • 启动服务。

4.2 TC高可用

启动多台服务,使cluster中配置不同。

在nacos中添加客户端配置

properties
# 事务组映射关系


<NolebasePageProperties />




service.vgroupMapping.seata-demo=SH

service.enableDegrade=false
service.disableGlobalTransaction=false
# 与TC服务的通信配置
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
# RM配置
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
# TM配置
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000

# undo日志配置
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
client.log.exceptionRate=100

客户端读取配置

yml
seata:
  config:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      username: nacos
      password: nacos
      group: SEATA_GROUP
      data-id: client.properties

微服务连接放入tc集群,统一由nacos的client.properties来决定。